Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve bigtableio.WriteBatch performance by approx. 20% due to non blocking batch #24364

Conversation

capthiron
Copy link
Contributor

@capthiron capthiron commented Nov 27, 2022

Relates to issue #23324.

Further testing with the changes from #23411 showed that especially for larger batch-processed datasets (tested with datasets with sizes of around 110 million rows) profited from removing the GroupByKey step and therefore making the actual WriteBatch process non-blocking and up to 20% faster. This was due to the bounded nature of batch-processed PCollections where the groupByKey completely blocked the writing to bigtable until the whole collection was keyed. Changes been fully tested. Maybe there is a chance they still make the initial introduction of bigtableio in release-2.44.0.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

- introduce a MutationBatcher for WriteBatch that does not require a keyed PBCollection and therefore is not required to wait for the GroupByKey process which is blocking in a batch pipeline due to the bounded nature of the input data.
- achieves an approximate 20% performance increase, especially for
  larger datasets
@capthiron
Copy link
Contributor Author

R: @lostluck

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@codecov
Copy link

codecov bot commented Nov 27, 2022

Codecov Report

Merging #24364 (aa869f3) into master (fd7e862) will decrease coverage by 0.00%.
The diff coverage is 3.03%.

@@            Coverage Diff             @@
##           master   #24364      +/-   ##
==========================================
- Coverage   73.36%   73.35%   -0.01%     
==========================================
  Files         718      718              
  Lines       97034    97036       +2     
==========================================
- Hits        71188    71185       -3     
- Misses      24516    24520       +4     
- Partials     1330     1331       +1     
Flag Coverage Δ
go 51.44% <3.03%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/io/bigtableio/bigtable.go 27.65% <3.03%> (-0.40%) ⬇️
sdks/go/pkg/beam/core/metrics/dumper.go 49.20% <0.00%> (-4.77%) ⬇️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to take a good look at this tomorrow, but I do need to note that not having the GBK in the IO itself prevents the IO from controlling it's checkpointing behavior.
Eg. If the writes and internal retries fail, the runner would generally restart that stage again from scratch from the last checkpoint, which is typically the previous GBK (or stateful DoFn).

This would essentially redo any computations before the IO, which could use to duplicates and similar concerns. See (the admittedly meager) guidance for sinks here: https://beam.apache.org/documentation/io/developing-io-overview/#sinks

Just making sure it's understood what the trade off is in this case which we should make clear in the documentation.

@capthiron
Copy link
Contributor Author

capthiron commented Nov 29, 2022

Unfortunately we are currently facing scalability issues with either this or the previous version with the groupBy step while Processing in batch mode. Do you have any Idea why this approach would not scale? We recently ran a comparison with the python-based Bigtable connector writing data gathered from BigQuery which in general was slower but then took advantage of dataflow's autoscaling. This scaled up to 28 workers and the throughput grew pretty linearly as well which lead to a reduced duration of the import with over 110 million rows by a factor of 10. Do you have any ideas where the problem could be?

I wondered whether structs like these could be a problem?

type writeBatchFn struct {
	// Project is the project
	Project string `json:"project"`
	// InstanceID is the bigtable instanceID
	InstanceID string `json:"instanceId"`
	// Client is the bigtable.Client
	client *bigtable.Client `json:"-"`
	// TableName is the qualified table identifier.
	TableName string `json:"tableName"`
	// Table is a bigtable.Table instance with an eventual open connection
	table *bigtable.Table `json:"-"`
	// MutationBatcher is responsible for collecting and writing mutations as batches to bigtable
	MutationBatcher MutationBatcher `json:"mutationBatcher"`
	// Type is the encoded schema type.
	Type beam.EncodedType `json:"type"`
}

I am also not sure whether the BigQuery source may already cause the non-scalability. When I took a deeper look into the Python implementation of the BigQuery connector I found that other than the Go one it utilises a split function when reading data. From my understanding this will give you unbound output that could help with the parallelisation but I am not sure about that and not really familiar with the core workings of these scaling operations. So only wild guesses 🙆‍♂️🥲

@capthiron capthiron marked this pull request as draft November 29, 2022 15:54
@lostluck
Copy link
Contributor

lostluck commented Nov 29, 2022

If it's the "go native bigquery" source and not the xlang bigquery source that's the problem. The Go Native bigqueryio doesn't currently scale, and has not been performance vetted. It's extremely old and needs a re-write for scalability, since it pre-dates the scaling mechanism. See below the break for explanations.

If you can, try again with the Xlang BigQuery IO which uses the Java one under the hood. It'll download jars and spin up an expansion service to add it to your pipeline. Assuming it works, it will do all the scaling and such as needed. If there are problems, please file an issue so we can fix them.


So, because this is a sink, the Scaling would be constrained by whatever the source of the data is. So if you have a well authored SplittableDoFn as the source or a GBK, Dataflow can dynamically split.

Eg. If you have
Impulse -> Basic DoFn -> Write

All of this will stay on a single machine, and scaling won't help (and it doesn't scale.) This is currently how the "native" one is written, because SDFs didn't exist in the Go SDK at that point.

The basic workaround is the GBK.

Impulse -> Basic DoFn -> GBK -> Write

Which allows the post GBK bit to scale based on Key parallelism. But until the GBK, it's going to be single machined from the impulse.

Impulse -> SplittableDoFn -> Write

Then the runner (like Dataflow), can split and basically allow sharding of that work. This is how the current textio read works (the Write needs more work though).

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/textio/textio.go#L128

Eg. The textio implements it's read composite by first finding all the files for a glob, then getting the sizes for all of them (so it's filename + byte size pairs), and then uses a splittable DoFn so that each element there can be sub-split, based on the restrictions (eg. incase a single file is very very large).

You can learn more about SDFs in the programming guide.

@capthiron
Copy link
Contributor Author

After reconsidering that scaling is conditioned by the nature of the given data source, I think this optimisation with its drawbacks is actually not the way to go.
Unfortunately we were not able to apply the xlang workaround. In the case of bigqueryio we should just wait for a reworked native solution or even consider trying to implement it ourselves.
Nevertheless I'll close this PR for now. Thanks for the support! :-)

@capthiron capthiron closed this Dec 6, 2022
@lostluck
Copy link
Contributor

lostluck commented Dec 8, 2022

It would be extremely useful to know what else you were running into that blocked use of the cross language version of the big queryio. We can't fix what we don't know about and "it didn't work" isn't a lot to go on. I know there was the "alias" coder thing. Was that it?

Please spare a moment if you can.

@capthiron
Copy link
Contributor Author

capthiron commented Dec 13, 2022

It would be extremely useful to know what else you were running into that blocked use of the cross language version of the big queryio. We can't fix what we don't know about and "it didn't work" isn't a lot to go on. I know there was the "alias" coder thing. Was that it?

Please spare a moment if you can.

Sorry for not being that descriptive. I'll see that I can show you some code snippets, explaining the approach taken and in what errors it resulted. Out of confidentiality reasons I'll first have to construct an equivalent job, because the testing took place within a business-related and confidential codebase.
I will be back with hopefully better information on this soon, thanks for your patience :))

@lostluck
Copy link
Contributor

Understood! That's way more than I expected! I can wait.

I was expecting something like: Our Table uses feature foo, or partitioning blah and we got X error. The BigQuery API is non-trivial, and with Xlang we end up need another intermediary layer. However, the bugs are most likely to be in the Go SDK's construction or in how it's receiving data from Java.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants